-
Notifications
You must be signed in to change notification settings - Fork 3
Annex.py: Update hash algorithm and switch to UNIX timestamps #11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
qa-cea
commented
Aug 27, 2025
- Switch from MD5 to SHA3-256 to avoid any risk of collision. MD5 is still supported for retrieving from the annex, pushing is only using SHA3-256
- Switch to %c date format to a UNIX timestamp to allow collaboration with people with different date format / timezome. The old format is supported for reading, the push only UNIX timestamp
# MD5 | ||
if meta.st_size == 32: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: is there no other way to know the type of hash being used ? Feels really fragile
if meta.st_size == 32: | ||
logging.warning("Using deprecated hash algorithm (MD5)") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: don't think adding this warning is important, since it will be normal to have it until all file paths are migrated to sha3, at which point this shouldn't occur anymore
if meta.st_size == 32: | ||
logging.warning("Using deprecated hash algorithm (MD5)") | ||
with open(filepath, encoding='utf-8') as fh: | ||
identifier = fh.read(32) | ||
return all(byte in string.hexdigits for byte in identifier) | ||
|
||
# SHA3 256 | ||
elif meta.st_size == 64: | ||
with open(filepath, encoding='utf-8') as fh: | ||
identifier = fh.read(64) | ||
return all(byte in string.hexdigits for byte in identifier) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: merge the two conditions
raise ValueError(f"Invalid date format in metadata: {insertion_time}") | ||
|
||
# UNIX timestamp | ||
elif isinstance(insertion_time, (int, float)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: is the float type really possible ? Seems unlikely for a timestamp
elif isinstance(insertion_time, (int, float)): | ||
insertion_time = insertion_time | ||
else: | ||
raise ValueError("Invalid date format in metadata") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest: print the type of the object
|
||
# UNIX timestamp | ||
elif isinstance(insertion_time, (int, float)): | ||
insertion_time = insertion_time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
major: don't think this line is really necessary
@@ -226,13 +236,29 @@ def list(self): | |||
if not filename.endswith('.info'): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest: change the condition to check it ends with a .info
and continue
if true
if os.path.exists(destpath): | ||
destinfo = os.stat(destpath) | ||
if destinfo and destinfo.st_size == originfo.st_size and \ | ||
filename in metadata.get('filenames', {}): | ||
logging.debug('%s is already into annex, skipping it', filename) | ||
|
||
else: | ||
# Update them and write them back | ||
fileset = metadata.setdefault('filenames', {}) | ||
fileset.setdefault(filename, {}) | ||
fileset[filename]['date'] = time.strftime("%c") | ||
self._save_metadata(digest, metadata) | ||
|
||
# Move binary file to annex | ||
logging.debug('Importing %s into annex (%s)', filepath, digest) | ||
shutil.copyfile(filepath, destpath) | ||
os.chmod(destpath, self.WMODE) | ||
if destinfo and destinfo.st_size == originfo.st_size and \ | ||
filename in metadata.get('filenames', {}): | ||
logging.debug('%s is already into annex, skipping it', filename) | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: shouldn't we return an error or at least continue if os.path.exists(destpath)
is false ?